{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Copyright (c) Microsoft Corporation. All rights reserved.\n",
        "\n",
        "Licensed under the MIT License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/azure-arcadia/spark_session_on_synapse_spark_pool.png)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Interactive Spark Session on Synapse Spark Pool"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Install package"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "!pip install -U \"azureml-synapse\""
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "For JupyterLab, please additionally run:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "!jupyter lab build --minimize=False"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## PLEASE restart kernel and then refresh web page before starting spark session."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 0. How to leverage Spark Magic for interactive Spark experience"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "execution": {
          "iopub.execute_input": "2020-06-05T03:22:14.965395Z",
          "iopub.status.busy": "2020-06-05T03:22:14.965395Z",
          "iopub.status.idle": "2020-06-05T03:22:14.970398Z",
          "shell.execute_reply": "2020-06-05T03:22:14.969397Z",
          "shell.execute_reply.started": "2020-06-05T03:22:14.965395Z"
        }
      },
      "outputs": [],
      "source": [
        "# show help\n",
        "%synapse ?"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 1. Start Synapse Session"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "synapse_compute_name=os.getenv(\"SYNAPSE_COMPUTE_NAME\", \"<my-synapse-compute-name>\")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# use Synapse compute linked to the Compute Instance's workspace with an aml envrionment.\n",
        "# conda dependencies specified in the environment will be installed before the spark session started.\n",
        "\n",
        "%synapse start -c $synapse_compute_name -e AzureML-Minimal"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# use Synapse compute from anther workspace via its config file\n",
        "\n",
        "# %synapse start -c <compute-name> -f config.json"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# use Synapse compute from anther workspace via subscription_id, resource_group and workspace_name\n",
        "\n",
        "# %synapse start -c <compute-name> -s <subscription-id> -r <resource group> -w <workspace-name>"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# start a spark session with an AML environment, \n",
        "# %synapse start -c <compute-name> -s <subscription-id> -r <resource group> -w <workspace-name> -e AzureML-Minimal"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 2. Data prepration\n",
        "\n",
        "Three types of datastore are supported in synapse spark, and you have two ways to load the data.\n",
        "\n",
        "\n",
        "| Datastore Type     | Data Acess                    |\n",
        "|--------------------|-------------------------------|\n",
        "| Blob               | Credential                    |\n",
        "| Adlsgen1           | Credential & Credential-less  |\n",
        "| Adlsgen2           | Credential & Credential-less  |"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Example 1: Data loading by HDFS path"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "**Read data from Blob**\n",
        "\n",
        "```python\n",
        "# setup access key or sas token\n",
        "\n",
        "sc._jsc.hadoopConfiguration().set(\"fs.azure.account.key.<storage account name>.blob.core.windows.net\", \"<acess key>\")\n",
        "sc._jsc.hadoopConfiguration().set(\"fs.azure.sas.<container name>.<storage account name>.blob.core.windows.net\", \"sas token\")\n",
        "\n",
        "df = spark.read.parquet(\"wasbs://<container name>@<storage account name>.blob.core.windows.net/<path>\")\n",
        "```\n",
        "\n",
        "**Read data from Adlsgen1**\n",
        "\n",
        "```python\n",
        "# setup service pricinpal which has access of the data\n",
        "# If no data Credential is setup, the user identity will be used to do access control\n",
        "\n",
        "sc._jsc.hadoopConfiguration().set(\"fs.adl.account.<storage account name>.oauth2.access.token.provider.type\",\"ClientCredential\")\n",
        "sc._jsc.hadoopConfiguration().set(\"fs.adl.account.<storage account name>.oauth2.client.id\", \"<client id>\")\n",
        "sc._jsc.hadoopConfiguration().set(\"fs.adl.account.<storage account name>.oauth2.credential\", \"<client secret>\")\n",
        "sc._jsc.hadoopConfiguration().set(\"fs.adl.account.<storage account name>.oauth2.refresh.url\", \"https://login.microsoftonline.com/<tenant id>/oauth2/token\")\n",
        "\n",
        "df = spark.read.csv(\"adl://<storage account name>.azuredatalakestore.net/<path>\")\n",
        "```\n",
        "\n",
        "**Read data from Adlsgen2**\n",
        "\n",
        "```python\n",
        "# setup service pricinpal which has access of the data\n",
        "# If no data Credential is setup, the user identity will be used to do access control\n",
        "\n",
        "sc._jsc.hadoopConfiguration().set(\"fs.azure.account.auth.type.<storage account name>.dfs.core.windows.net\",\"OAuth\")\n",
        "sc._jsc.hadoopConfiguration().set(\"fs.azure.account.oauth.provider.type.<storage account name>.dfs.core.windows.net\", \"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider\")\n",
        "sc._jsc.hadoopConfiguration().set(\"fs.azure.account.oauth2.client.id.<storage account name>.dfs.core.windows.net\", \"<client id>\")\n",
        "sc._jsc.hadoopConfiguration().set(\"fs.azure.account.oauth2.client.secret.<storage account name>.dfs.core.windows.net\", \"<client secret>\")\n",
        "sc._jsc.hadoopConfiguration().set(\"fs.azure.account.oauth2.client.endpoint.<storage account name>.dfs.core.windows.net\", \"https://login.microsoftonline.com/<tenant id>/oauth2/token\")\n",
        "\n",
        "df = spark.read.csv(\"abfss://<container name>@<storage account>.dfs.core.windows.net/<path>\")\n",
        "```"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "execution": {
          "iopub.execute_input": "2020-06-04T08:11:18.812276Z",
          "iopub.status.busy": "2020-06-04T08:11:18.812276Z",
          "iopub.status.idle": "2020-06-04T08:11:23.854526Z",
          "shell.execute_reply": "2020-06-04T08:11:23.853525Z",
          "shell.execute_reply.started": "2020-06-04T08:11:18.812276Z"
        }
      },
      "outputs": [],
      "source": [
        "%%synapse\n",
        "\n",
        "from pyspark.sql.functions import col, desc\n",
        "\n",
        "df = spark.read.option(\"header\", \"true\").csv(\"wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv\")\n",
        "df.filter(col('Survived') == 1).groupBy('Age').count().orderBy(desc('count')).show(10)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Example 2: Data loading by AML Dataset\n",
        "\n",
        "You can create tabular data by following the [guidance](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-create-register-datasets) and use to_spark_dataframe() to load the data.\n",
        "\n",
        "```text\n",
        "%%synapse\n",
        "\n",
        "import azureml.core\n",
        "print(azureml.core.VERSION)\n",
        "\n",
        "from azureml.core import Workspace, Dataset\n",
        "ws = Workspace.get(name='<workspace name>', subscription_id='<subscription id>', resource_group='<resource group>')\n",
        "ds = Dataset.get_by_name(ws, \"<tabular dataset name>\")\n",
        "df = ds.to_spark_dataframe()\n",
        "\n",
        "# You can do more data transformation on spark dataframe\n",
        "```"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 3. Session Metadata\n",
        "After session started, you can check the session's metadata, find the links to Synapse portal."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "%synapse meta"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 4. Stop Session\n",
        "When current session reach the status timeout, dead or any failure, you must explicitly stop it before start new one. "
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "%synapse stop"
      ]
    }
  ],
  "metadata": {
    "authors": [
      {
        "name": "yunzhan"
      }
    ],
    "kernelspec": {
      "display_name": "Python 3.8 - AzureML",
      "language": "python",
      "name": "python38-azureml"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.6.7"
    },
    "nteract": {
      "version": "0.28.0"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 4
}